/*
 * Copyright 2016-2023 ClickHouse, Inc.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


/*
 * This file may have been modified by Bytedance Ltd. and/or its affiliates (“ Bytedance's Modifications”).
 * All Bytedance's Modifications are Copyright (2023) Bytedance Ltd. and/or its affiliates.
 */

#pragma once

#include <optional>
#include <vector>
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/MergeTree/LateMaterialize/MergeTreeRangeReaderLM.h>

namespace DB
{

class MergeTreeMetaBase;
struct MergeTreeReadTask;
struct MergeTreeBlockSizePredictor;

using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
using MergeTreeBlockSizePredictorPtr = std::unique_ptr<MergeTreeBlockSizePredictor>;


/** If some of the requested columns are not in the part,
  * then find out which columns may need to be read further,
  * so that you can calculate the DEFAULT expression for these columns.
  * Adds them to the `columns`.
  */
NameSet injectRequiredColumns(const MergeTreeMetaBase & storage,
    const StorageMetadataPtr & metadata_snapshot, const MergeTreeMetaBase::DataPartPtr & part,
    Names & columns, const String& default_injected_column);


struct MergeTreeReadTaskColumns
{
    /// column names to read during WHERE
    NamesAndTypesList columns;
    /// column names to read during PREWHERE
    NamesAndTypesList pre_columns;
    /// bitmap columns read during WHERE
    NameSet bitmap_index_columns;
    /// bitmap columns read during PREWHERE
    NameSet bitmap_index_pre_columns;
    /// resulting block may require reordering in accordance with `ordered_names`
    bool should_reorder;
    /// column names to read in each stage in chain when using early materialize read
    std::vector<NamesAndTypesList> per_stage_columns;
    /// nameset for bitmap column for each stage
    std::vector<NameSet> per_stage_bitmap_nameset;
};

/// A batch of work for MergeTreeThreadSelectBlockInputStream
struct MergeTreeReadTask
{
    /// data part which should be read while performing this task
    MergeTreeMetaBase::DataPartPtr data_part;
    /// used to filter out deleted rows from part, could be nullptr if no deleted rows
    ImmutableDeleteBitmapPtr delete_bitmap;
    /// Ranges to batch read once from `data_part`.
    MarkRanges mark_ranges_once_read;
    /// for virtual `part_index` virtual column
    size_t part_index_in_query;
    /// ordered list of column names used in this query, allows returning blocks with consistent ordering
    const Names & ordered_names;
    /// used to determine whether column should be filtered during PREWHERE or WHERE
    const NameSet & column_name_set;
    /// task columns
    const MergeTreeReadTaskColumns & task_columns;
    /// should PREWHERE column be returned to requesting side?
    const bool remove_prewhere_column;
    /// resulting block may require reordering in accordance with `ordered_names`
    const bool should_reorder;
    /// Used to satistfy preferred_block_size_bytes limitation
    MergeTreeBlockSizePredictorPtr size_predictor;
    /// Used to save current range processing status
    MergeTreeRangeReader range_reader;
    MergeTreeRangeReader pre_range_reader;
    MergeTreeRangeReaderLM * msr_range_reader = nullptr;
    /// Ranges to total read, which will be used for to calc buffer size and disk-cache caching data ranges
    MarkRanges mark_ranges_total_read;

    bool isFinished() const
    {
        if (!msr_range_reader)
            return mark_ranges_once_read.empty() && range_reader.isCurrentRangeFinished();
        else
            return mark_ranges_once_read.empty() && msr_range_reader->isCurrentRangeFinished();
    }

    MergeTreeReadTask(
        const MergeTreeMetaBase::DataPartPtr & data_part_, ImmutableDeleteBitmapPtr delete_bitmap_, const MarkRanges & mark_ranges_once_read_, const size_t part_index_in_query_,
        const Names & ordered_names_, const NameSet & column_name_set_,
        const MergeTreeReadTaskColumns & task_columns_,
        bool remove_prewhere_column_, bool should_reorder_,
        MergeTreeBlockSizePredictorPtr && size_predictor_, const MarkRanges & mark_ranges_total_read_);

    virtual ~MergeTreeReadTask();
};

MergeTreeReadTaskColumns getReadTaskColumns(
    const MergeTreeMetaBase & storage,
    const StorageSnapshotPtr & storage_snapshot,
    const MergeTreeMetaBase::DataPartPtr & data_part,
    const Names & required_columns,
    const PrewhereInfoPtr & prewhere_info,
    const MergeTreeIndexContextPtr & index_context,
    const std::deque<AtomicPredicatePtr> & atomic_predicates,
    bool check_columns);

struct MergeTreeBlockSizePredictor
{
    MergeTreeBlockSizePredictor(const MergeTreeMetaBase::DataPartPtr & data_part_, const Names & columns, const Block & sample_block);

    /// Reset some values for correct statistics calculating
    void startBlock();

    /// Updates statistic for more accurate prediction
    void update(const Block & sample_block, const Columns & columns, size_t num_rows,
        bool size_predictor_estimate_lc_size_by_fullstate, double decay = DECAY());

    /// Return current block size (after update())
    inline size_t getBlockSize() const
    {
        return block_size_bytes;
    }


    /// Predicts what number of rows should be read to exhaust byte quota per column
    inline size_t estimateNumRowsForMaxSizeColumn(size_t bytes_quota) const
    {
        double max_size_per_row = std::max<double>(std::max<size_t>(max_size_per_row_fixed, 1), max_size_per_row_dynamic);
        return (bytes_quota > block_size_rows * max_size_per_row)
            ? static_cast<size_t>(bytes_quota / max_size_per_row) - block_size_rows
            : 0;
    }

    /// Predicts what number of rows should be read to exhaust byte quota per block
    inline size_t estimateNumRows(size_t bytes_quota) const
    {
        return (bytes_quota > block_size_bytes)
            ? static_cast<size_t>((bytes_quota - block_size_bytes) / std::max<size_t>(1, bytes_per_row_current))
            : 0;
    }

    inline void updateFilteredRowsRation(size_t rows_was_read, size_t rows_was_filtered, double decay = DECAY())
    {
        double alpha = std::pow(1. - decay, rows_was_read);
        double current_ration = rows_was_filtered / std::max(1.0, static_cast<double>(rows_was_read));
        filtered_rows_ratio = current_ration < filtered_rows_ratio
            ? current_ration
            : alpha * filtered_rows_ratio + (1.0 - alpha) * current_ration;
    }

    /// Aggressiveness of bytes_per_row updates. See update() implementation.
    /// After n=NUM_UPDATES_TO_TARGET_WEIGHT updates v_{n} = (1 - TARGET_WEIGHT) * v_{0} + TARGET_WEIGHT * v_{target}
    static constexpr double TARGET_WEIGHT = 0.5;
    static constexpr size_t NUM_UPDATES_TO_TARGET_WEIGHT = 8192;
    static double DECAY() { return 1. - std::pow(TARGET_WEIGHT, 1. / NUM_UPDATES_TO_TARGET_WEIGHT); }

protected:

    MergeTreeMetaBase::DataPartPtr data_part;

    struct ColumnInfo
    {
        String name;
        double bytes_per_row_global = 0;
        double bytes_per_row = 0;
        size_t size_bytes = 0;
    };

    std::vector<ColumnInfo> dynamic_columns_infos;
    size_t fixed_columns_bytes_per_row = 0;

    size_t max_size_per_row_fixed = 0;
    double max_size_per_row_dynamic = 0;

    size_t number_of_rows_in_part;

    bool is_initialized_in_update = false;

    void initialize(const Block & sample_block, const Columns & columns, const Names & names,
        bool from_update = false, bool size_predictor_estimate_lc_size_by_fullstate = true);

public:

    size_t block_size_bytes = 0;
    size_t block_size_rows = 0;

    /// Total statistics
    double bytes_per_row_current = 0;
    double bytes_per_row_global = 0;

    double filtered_rows_ratio = 0;
};

}
